Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Runners, introduce Task class #4206

Merged
merged 26 commits into from
Nov 1, 2024
Merged

Refactor Runners, introduce Task class #4206

merged 26 commits into from
Nov 1, 2024

Conversation

merelcht
Copy link
Member

@merelcht merelcht commented Oct 3, 2024

Description

Introduced the Task class, which encapsulates what is actually run in each of the runners to make the Runners code more readable.

I've always found the code in runners a bit hard to navigate. The (simplified) flow before my refactor for running a node was:

graph TD
    A[run/run_node in runner.py] --> B[_run in sequential_runner.py]
    A[run/run_node in runner.py] --> C[_run in thread_runner.py]
    A[run/run_node in runner.py] --> D[_run in parallel_runner.py]
    B -->  A[run/run_node in runner.py]
    C -->     A[run/run_node in runner.py]
    D -->     A[run/run_node in runner.py]
    A--> F[run in node.py]
Loading

Now it's:

graph TD
    A[run in runner.py] --> B[_run in sequential_runner.py]
    A[run in runner.py] --> C[_run in thread_runner.py]
    A[run in runner.py] --> D[_run in parallel_runner.py]
    B --> E[execute in task.py]
    C --> E[execute in task.py]
    D --> E[execute in task.py]
    E --> F[run in node.py]
Loading

Development notes

  • Created Task which contains all that is needed to execute a Node.
  • Created _release_datasets() to remove duplicated code across the runners.
  • Moved all helper methods related to running a node from runner.py to the Task class.
  • Moved the helper methods in parallel_runner.py to task.py
  • Marked run_node() as deprecated, because it's replaced by Task.execute() and no longer called directly anywhere other than tests.

Developer Certificate of Origin

We need all contributions to comply with the Developer Certificate of Origin (DCO). All commits must be signed off by including a Signed-off-by line in the commit message. See our wiki for guidance.

If your PR is blocked due to unsigned commits, then you must follow the instructions under "Rebase the branch" on the GitHub Checks page for your PR. This will retroactively add the sign-off to all unsigned commits and allow the DCO check to pass.

Checklist

  • Read the contributing guidelines
  • Signed off each commit with a Developer Certificate of Origin (DCO)
  • Opened this PR as a 'Draft Pull Request' if it is work-in-progress
  • Updated the documentation to reflect the code changes
  • Added a description of this change in the RELEASE.md file
  • Added tests to cover my changes
  • Checked if this change will affect Kedro-Viz, and if so, communicated that with the Viz team

Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
# Conflicts:
#	kedro/runner/runner.py
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
@@ -75,21 +75,22 @@ def _run(

for exec_index, node in enumerate(nodes):
try:
run_node(node, catalog, hook_manager, self._is_async, session_id)
from kedro.runner.task import Task
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is needed because I refactored run_node in the runner to use Task and moved methods to Task as well. run_node isn't actually needed anymore, but removing it would be a breaking change. I could undo the changes to runner.py which removes the import from Task and then allows it to be imported inside the runner implementations again. The downside is that we'd have duplicated code in runner.py and task.py.

merelcht and others added 4 commits October 8, 2024 16:13
Signed-off-by: Merel Theisen <49397448+merelcht@users.noreply.github.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
@merelcht merelcht requested a review from idanov October 9, 2024 12:36
@deepyaman
Copy link
Member

Is there a relevant issue for this? Nothing against the idea of introducing the "task" abstraction; just interested to better understand what motivates it.

@merelcht
Copy link
Member Author

Is there a relevant issue for this? Nothing against the idea of introducing the "task" abstraction; just interested to better understand what motivates it.

I'll update the description when the PR is ready for review.

merelcht and others added 4 commits October 15, 2024 12:40
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <49397448+merelcht@users.noreply.github.com>
@merelcht merelcht marked this pull request as ready for review October 15, 2024 13:31
@merelcht merelcht self-assigned this Oct 15, 2024
@merelcht merelcht changed the title [WIP] Refactor Runners, introduce Task Refactor Runners, introduce Task class Oct 15, 2024
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
@merelcht merelcht linked an issue Oct 15, 2024 that may be closed by this pull request
merelcht and others added 2 commits October 15, 2024 15:10
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
@noklam noklam self-requested a review October 16, 2024 13:36
@noklam
Copy link
Contributor

noklam commented Oct 16, 2024

In general not much concern the refactoring looks simple enough and is a better abstraction than the previous one. I would like to run the benchmark once #4210 is ready to make sure we don't run into memory/perf issue.

Side note: I think the current Task is a bit weird with node/hook manager/catalog, but I understand it's necessary for keeping it non-breaking so it's all good, maybe something to revise when we are closer to 0.20.0

merelcht and others added 3 commits October 17, 2024 14:16
…ional argument, and adding parallel as boolean flag

Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Signed-off-by: Merel Theisen <merel.theisen@quantumblack.com>
Copy link
Contributor

@ElenaKhaustova ElenaKhaustova left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks much cleaner now, thank you @merelcht!

Left one minor suggestion.

kedro/runner/runner.py Show resolved Hide resolved
Copy link
Contributor

@noklam noklam left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR looks good, I still want to wait #4210 to benchmark the runner if it's not too urgent.

Going through the runner code again, I have some thought (not specific to the refactoring).

Q1: With modern Python like asyncio, the first question I have is are we really doing async in Kedro? I see most of the async references are associated with threading, but I don't think they are the same thing. This maybe something that we could consider and I think in general async is simpler and was designed exactly for I/O taskes. Things may be a bit tricky since Python 3.13 start having a GIL free thread.

Q2: is_async in SequentialRunner is only for a limited scope, i.e. if nodes has mulitiple dataset, the order of loading doesn't matter and can be loaded in an async manner (Why isn't it the default already?)

Q3: There is another level of async, which is at multi-node level where each node is being executed asynchronously, again async maybe a simpler solution.

@merelcht
Copy link
Member Author

The PR looks good, I still want to wait #4210 to benchmark the runner if it's not too urgent.

Yes of course! We can definitely wait for that.

Going through the runner code again, I have some thought (not specific to the refactoring).

Q1: With modern Python like asyncio, the first question I have is are we really doing async in Kedro? I see most of the async references are associated with threading, but I don't think they are the same thing. This maybe something that we could consider and I think in general async is simpler and was designed exactly for I/O taskes. Things may be a bit tricky since Python 3.13 start having a GIL free thread.

Q2: is_async in SequentialRunner is only for a limited scope, i.e. if nodes has mulitiple dataset, the order of loading doesn't matter and can be loaded in an async manner (Why isn't it the default already?)

Q3: There is another level of async, which is at multi-node level where each node is being executed asynchronously, again async maybe a simpler solution.

These are excellent points, thanks @noklam ! I haven't had time to continue refactoring, but I will definitely take this on as part of it.

@merelcht
Copy link
Member Author

merelcht commented Oct 30, 2024

@noklam I don't know if I did this correct by I ran asv run (against main) and then asv show main and got:

benchmark_runner.RunnerMemorySuite.mem_runners [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ========
        runner
  ------------------ --------
   SequentialRunner     0
     ThreadRunner     failed
    ParallelRunner      0
  ================== ========
  started: 2024-10-30 15:53:21, duration: 16.2s

benchmark_runner.RunnerMemorySuite.peakmem_runners [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ========
        runner
  ------------------ --------
   SequentialRunner    102M
     ThreadRunner     failed
    ParallelRunner    98.1M
  ================== ========
  started: 2024-10-30 15:53:37, duration: 16.1s

benchmark_runner.RunnerTimeSuite.time_compute_bound_runner [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ============
        runner
  ------------------ ------------
   SequentialRunner   7.34±0.01s
     ThreadRunner       failed
    ParallelRunner    2.03±0.3s
  ================== ============
  started: 2024-10-30 15:53:53, duration: 1.11m

benchmark_runner.RunnerTimeSuite.time_io_bound_runner [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ============
        runner
  ------------------ ------------
   SequentialRunner   20.2±0.01s
     ThreadRunner       failed
    ParallelRunner    3.23±0.01s
  ================== ============
  started: 2024-10-30 15:54:25, duration: 1.90m

Then I did asv run main..runners -b runner --step 2 and asv show runners and got:

benchmark_runner.RunnerMemorySuite.mem_runners [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ========
        runner
  ------------------ --------
   SequentialRunner     0
     ThreadRunner     failed
    ParallelRunner      0
  ================== ========
  started: 2024-10-30 16:14:54, duration: 1.91m

benchmark_runner.RunnerMemorySuite.peakmem_runners [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ========
        runner
  ------------------ --------
   SequentialRunner    102M
     ThreadRunner     failed
    ParallelRunner    98.3M
  ================== ========
  started: 2024-10-30 16:16:49, duration: 15.9s

benchmark_runner.RunnerTimeSuite.time_compute_bound_runner [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ============
        runner
  ------------------ ------------
   SequentialRunner   7.47±0.07s
     ThreadRunner       failed
    ParallelRunner    2.06±0.01s
  ================== ============
  started: 2024-10-30 16:17:05, duration: 1.07m

benchmark_runner.RunnerTimeSuite.time_io_bound_runner [M-HLJY4F7K07/virtualenv-py3.11-kedro-datasets[pandas]]
  1/3 failed
  ================== ============
        runner
  ------------------ ------------
   SequentialRunner   20.2±0.01s
     ThreadRunner       failed
    ParallelRunner    3.23±0.01s
  ================== ============
  started: 2024-10-30 16:17:36, duration: 1.90m

So overall it looks like there isn't really a difference in performance.

From the pipeline test: https://github.com/kedro-org/kedro/actions/runs/11594561241/job/32281088073?pr=4206 you can also see there's hardly a difference.

noklam and others added 3 commits October 31, 2024 12:36
Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>
Signed-off-by: Nok Lam Chan <nok.lam.chan@quantumblack.com>
@merelcht merelcht requested a review from noklam October 31, 2024 16:56
@merelcht merelcht merged commit 18bde07 into main Nov 1, 2024
34 checks passed
@merelcht merelcht deleted the runners branch November 1, 2024 10:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Refactor Runners
5 participants